<?php

namespace App\Process;

use App\Model\FormidsModel;
use App\Model\RecordsModel;
use App\Utility\RedisTools;
use Carbon\Carbon;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\EasySwoole\Swoole\Process\AbstractProcess;
use Swoole\Process;

class MessageOpenid extends AbstractProcess
{
    private $isRun = false;
    public function run(Process $process){
        /*
         * 举例，消费redis中的队列数据
         * 定时500ms检测有没有任务，有的话就while死循环执行
         */
        $this->addTick(100,function (){
            if(!$this->isRun){
                $this->isRun = true;
                $queue = new RedisTools('message-openid');
                \Co::create(function () use ($queue){
                    while (true){
                        try{
                            //lPop 乱序 rPop先进先出循序
                            $task = $queue->rPop();
                            if($task){
                                //查出所有未过期的formid的信息
                                $model = new FormidsModel();
                                $expre = time()+600;
                                $list = $model->select('id,openid,formid')
                                    ->where('expires_at',$expre,'>')
                                    ->groupBy('openid')
                                    ->getAll();
                                if($list){
                                    $recordModel = new RecordsModel();
                                    $record = $recordModel->where('template_id', $task)
                                        ->orderBy('id')->first();
                                    $total = count($list);
                                    $recordModel->update(['id'=>$record['id'],'number'=>$total,'start_at'=>Carbon::now()]);
                                    $_list = array_chunk($list,1000);
                                    $queue_push = new RedisTools('message-create');
                                    foreach ($_list as $item){
                                        $queue_push->lPush(['item'=>$item,'template_id'=>$task, 'record_id'=>$record['id']
                                        ]);
                                    }
                                    unset($queue_push);
                                }
                            }else{
                                break;
                            }
                            unset($task);
                        }catch (\Throwable $throwable){
                            break;
                        }
                    }
                    $this->isRun = false;
                });
                unset($queue);
            }
        });
    }

    public function onShutDown()
    {
        // TODO: Implement onShutDown() method.
    }

    public function onReceive(string $str, ...$args)
    {
        // TODO: Implement onReceive() method.
    }
}
